package gu.simplemq.redis;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import gu.simplemq.BaseSubscriber;
import gu.simplemq.IMessageDispatcher;
import gu.simplemq.exceptions.SmqTypeException;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

/**
 * {@link BaseSubscriber}的redis 实现(线程安全)<br>
 * 每个 {@link JedisPoolLazy} 实例对应保持一个 {@link RedisSubscriber} 对象<br>
 * 对象可以复用(反复打开关闭) <br>
 * 调用 {@link #close()} 取消所有订阅频道才能结束消息线程<br>
 * {@link RedisFactory}会在JVM结束时调用 {@link #close()}
 * @author guyadong
 *
 */
public class RedisSubscriber extends BaseSubscriber implements IRedisComponent {
	private final JedisPoolLazy pool;
	private RedisSubHandle jedisPubSub; 
	private final AtomicBoolean closed = new AtomicBoolean(Boolean.FALSE);
	/** 重试连接的延时(毫秒) */
	private static final int RECONNECT_DELAY_MILLS = 5000;
	/** 执行消息线程的线程池对象 */
	private static final ExecutorService subscribeExecutor = MoreExecutors.getExitingExecutorService(
			new ThreadPoolExecutor(1, 1,
	                0L, TimeUnit.MILLISECONDS,
	                new LinkedBlockingQueue<Runnable>(),
	                new ThreadFactoryBuilder().setNameFormat("stp-subscribe-%d").build()));
	/**
	 * 消息订阅(subscribe)所使用的连接对象
	 */
	private volatile Jedis jedis = null;
	/** 当前实例在redis上唯一不重复的ID名字 */
	private final String clientId;	
	@Override
	public JedisPoolLazy getPoolLazy() {
		return this.pool;
	}
	
	RedisSubscriber(JedisPoolLazy poolLazy) {
		super();
		this.jedisPubSub=new RedisSubHandle(this); 
		this.pool = poolLazy;
		this.clientId = "RedisSubscriber_" + JedisUtils.incr(poolLazy,RedisConstants.SUBSCRIBER_COUNTER);

	}

	@Override
	protected void doSubscribe(String... channels) {
		synchronized (this) {
			try{
				jedisPubSub.subscribe(channels);
			}catch(JedisConnectionException e){
				open(channels);
			}
			addSubscriber(channels);
		}
	}

	@Override
	protected void doUnsubscribe(String... channels) {
		synchronized (this) {
			if(jedisPubSub.isSubscribed()) {
				jedisPubSub.unsubscribe(channels);
			}
		}
	}
	/**
	 * 返回有效的{@link Jedis}实例<br>
	 * 确保返回的实例是有效可以连接的
	 * @return {@link Jedis}实例
	 */
	private Jedis getJedis(){
		// double check
		if(null == jedis){
			synchronized (this) {
				if(null == jedis){
					jedis = pool.borrow();
				}
			}
		}
		return jedis;
	}
	/**
	 * 释放当前的{@link Jedis}实例<br>
	 * 向{@link JedisPoolLazy}归还{@link Jedis}实例，并将{@link #jedis}置为{@code null}
	 */
	private void releaseJedis(){
		// double check
		if(jedis != null){
			synchronized (this) {
				if(jedis != null){
					pool.release(jedis);
					jedis = null;
				}
			}
		}
	}
	/**
	 * 创建消息线程,订阅指定的频道<br>
	 * @param channels 频道名列表
	 */
	private void open(final String... channels){

		Runnable run = new Runnable(){
			String[] subs = channels;
			@Override
			public void run() {
				// 如果连接异常则释放当前连接对象重新申请连接
				while(!closed.get()){
					try{
						getJedis().subscribe(jedisPubSub,subs);
						break;
					} catch (JedisConnectionException e) {
						// 释放当前连接再试
						releaseJedis();
						if(!closed.get()){
							logger.error("Subscribing failed. {}", e.getMessage());
						}
						// 延时后再试
						try {
							Thread.sleep(RECONNECT_DELAY_MILLS);
						} catch (InterruptedException e1) {
						}
						// 重新注册所有频道
						subs = channelSubs.keySet().toArray(new String[0]);
					}catch (Throwable e) {
						if(!closed.get()){
							logger.error("Subscribing failed.", e);
						}
					}
				}
			}};
		subscribeExecutor.execute(run);
	}

	public RedisSubscriber setDispatcher(IMessageDispatcher dispatcher) {
		jedisPubSub.setDispatcher(dispatcher);
		return this;
	}

	@Override
	protected String check(String name) throws SmqTypeException {
		return RedisComponentType.Channel.check(pool, name);
	}

	@Override
	public void close() {
		if(closed.compareAndSet(Boolean.FALSE, Boolean.TRUE)){
			try {
				super.close();
				releaseJedis();
			} catch (JedisConnectionException e) {
				logger.error(e.toString());
			}
		}
	}
	private void addSubscriber(Collection<String>chSet){
		for(String name:chSet){
			JedisUtils.sadd(name + RedisConstants.SUBSCRIBER_SET_SUFFIX,clientId);
		}
	}
	private void addSubscriber(String[] chSet){
		addSubscriber(Arrays.asList(chSet));
	}
}
